package org.msgcenter.mc.actor;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.TypeReference;
import cn.hutool.core.lang.Validator;
import cn.hutool.http.HttpStatus;
import cn.hutool.json.JSONUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import lombok.extern.slf4j.Slf4j;
import org.msgcenter.mc.common.ConvertUtil;
import org.msgcenter.mc.common.JsonResult;
import org.msgcenter.mc.ds.CompanyWechatMsg;
import org.msgcenter.mc.ds.PushStateMsg;
import org.msgcenter.mc.ds.SiteMsg;

@Slf4j
public class MsgHttpSearchActor extends AbstractVerticle {

  private EventBus eb;

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    super.start(startPromise);

    eb = vertx.eventBus();


    Router router = Router.router(vertx);
    HttpServer server = vertx.createHttpServer();
    server.requestHandler(router)
      .listen(8882)
      .onSuccess(http -> log.info("消息搜索http服务启动成功 监听8882端口"))
      .onFailure(err -> log.error("消息搜索http服务启动失败 {}", err.getMessage()));

    onUpdatePushStateSiteMsgOk();
    onUpdatePushStateSiteMsgErr();
    onUpdatePushStateCompanyWechatMsgOk();
    onUpdatePushStateCompanyWecahtMsgErr();

    onSearchMsgById(router);
  }


  private void onUpdatePushStateSiteMsgOk() {
    MessageConsumer<String> consumer = eb.consumer("service.push_msg.site_msg.ok");

    consumer.handler(payload -> {
        JsonResult<String, SiteMsg> jr = JSONUtil.toBean(payload.body(), new TypeReference<>() {
        }, true);

        SiteMsg sm = jr.getData();

        // 转换成推送状态结构
        PushStateMsg stateMsg = ConvertUtil.siteMsgToPushStateMsg(sm);
        stateMsg.setMsgState((byte) 1);
        stateMsg.setPushTime((int) (System.currentTimeMillis() / 1000));
        stateMsg.setPushTimeStr(DateUtil.now());
        stateMsg.setErrType((byte) 0);
        stateMsg.setErrCode((byte) 0);
        stateMsg.setErrMsg("");

        // 保存推送消息状态
        eb.send("db.post.update_push_state", JSONUtil.toJsonStr(stateMsg));
      })
      .exceptionHandler(err -> log.error("消息推送状态入库出错 {}", err.getMessage()));
  }

  private void onUpdatePushStateSiteMsgErr() {
    MessageConsumer<String> consumer = eb.consumer("service.push_msg.site_msg.err");

    consumer.handler(payload -> {
      JsonResult<String, SiteMsg> jr = JSONUtil.toBean(payload.body(), new TypeReference<>() {
      }, true);

      SiteMsg sm = jr.getData();

      // 转换成推送状态结构
      PushStateMsg stateMsg = ConvertUtil.siteMsgToPushStateMsg(sm);
      stateMsg.setMsgState((byte) 1);
      stateMsg.setPushTime((int) (System.currentTimeMillis() / 1000));
      stateMsg.setPushTimeStr(DateUtil.now());
      stateMsg.setErrType((byte) 1);
      stateMsg.setErrCode((byte) 100);
      stateMsg.setErrMsg(jr.getMsg());

      // 保存推送消息状态
      eb.send("db.post.update_push_state", JSONUtil.toJsonStr(stateMsg));
    });
  }

  private void onUpdatePushStateCompanyWechatMsgOk() {
    MessageConsumer<String> consumer = eb.consumer("service.push_msg.company_wechat_msg.ok");

    consumer.handler(payload -> {
        JsonResult<String, CompanyWechatMsg> jr = JSONUtil.toBean(payload.body(), new TypeReference<>() {
        }, true);

        CompanyWechatMsg cwm = jr.getData();

        // 转换成推送状态结构
        PushStateMsg companyWechatMsg = ConvertUtil.siteMsgToPushStateMsg(cwm);
        companyWechatMsg.setMsgState((byte) 1);
        companyWechatMsg.setPushTime((int) (System.currentTimeMillis() / 1000));
        companyWechatMsg.setPushTimeStr(DateUtil.now());
        companyWechatMsg.setErrType((byte) 0);
        companyWechatMsg.setErrCode((byte) 0);
        companyWechatMsg.setErrMsg("");

        // 保存推送消息状态
        eb.send("db.post.update_push_state", JSONUtil.toJsonStr(companyWechatMsg));
      })
      .exceptionHandler(err -> log.error("消息推送状态入库出错 {}", err.getMessage()));
  }

  private void onUpdatePushStateCompanyWecahtMsgErr() {
    MessageConsumer<String> consumer = eb.consumer("service.push_msg.company_wechat_msg.err");

    consumer.handler(payload -> {
      JsonResult<String, CompanyWechatMsg> jr = JSONUtil.toBean(payload.body(), new TypeReference<>() {
      }, true);

      CompanyWechatMsg sm = jr.getData();

      // 转换成推送状态结构
      PushStateMsg companyWechatMsg = ConvertUtil.siteMsgToPushStateMsg(sm);
      companyWechatMsg.setMsgState((byte) 1);
      companyWechatMsg.setPushTime((int) (System.currentTimeMillis() / 1000));
      companyWechatMsg.setPushTimeStr(DateUtil.now());
      companyWechatMsg.setErrType((byte) 1);
      companyWechatMsg.setErrCode((byte) 100);
      companyWechatMsg.setErrMsg(jr.getMsg());

      // 保存推送消息状态
      eb.send("db.post.update_push_state", JSONUtil.toJsonStr(companyWechatMsg));
    });
  }

  private void onSearchMsgById(Router router) {
    Route route = router.get("/msg/:msgId");

    route.handler(rc -> {
      String msgId = rc.pathParam("msgId");

      // 检查消息id正确性
      if (!Validator.isNumber(msgId)) {
        JsonResult<String, String> errJr = new JsonResult<>(HttpStatus.HTTP_BAD_REQUEST, "消息id格式出错", msgId);
        rc.end(JSONUtil.toJsonStr(errJr));
        return;
      }

      // 查询pushstate
      eb.request("db.get.select_msg_by_id", Long.valueOf(msgId))
        .onSuccess(res -> rc.response()
          .putHeader("content-type", "application/json;charset=utf-8")
          .end(res.body().toString()))
        .onFailure(err -> {
          JsonResult<String, Object> result = new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, err.getMessage(), null);
          rc.response().putHeader("content-type", "application/json;charset=utf-9")
            .end(JSONUtil.toJsonStr(result));
        });
    });
  }

  @Override
  public void stop(Promise<Void> stopPromise) throws Exception {
    super.stop(stopPromise);
  }
}
